import MsgModel.UserBehavior.UserBehaviorMsg;
import Processer._RecordDispatcher;
import Util.Const;
import Util.DBOffsets;
import Util.MongoUtil;
import Util.Topic;
import com.alibaba.fastjson.JSON;
import com.mongodb.DBCollection;
import com.mongodb.spark.MongoSpark;
import kafka.MyKafkaUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.bson.Document;
import scala.Tuple2;

import java.util.*;

/**
 * Consumes messages from one or more topics in Kafka and process .
 * Usage: UserBehaviorLog
 * <p>
 * Example:
 * $ bin/spark-submit
 * --master spark://host:port
 * --deploy-mode cluster
 * --executor-memory 1g
 * --class UserBehaviorLog  BobolCalculator/BobolStreamingCalculator.jar
 */

public final class UserBehaviorLog {
    //    private static final Pattern SPACE = Pattern.compile(" ");
    private static final Logger logger = Logger.getLogger("mylog");

    public static void main(String[] args) throws Exception {
        final String broker = Const.BROKER_HOST;
        final String topic = Topic.USER_BEHAVIOR_LOG;
        final String consumerGroup = Topic.USER_BEHAVIOR_LOG + "_group";
        final int partition = 0;
        /*if (args.length < 2) {
            System.err.println("Usage: UserBehaviorLog <brokers> <topics>\n" +
                    "  <brokers> is a list of one or more Kafka brokers\n" +
                    "  <topics> is a list of one or more kafka topics to consume from\n\n");
            System.exit(1);
        }*/
        // Create context with a 5 seconds batch interval
        SparkConf sparkConf = new SparkConf()
                .setAppName("JavaDirectKafka_" + topic)
                .set("spark.mongodb.input.uri", Const.Mongo_URI_DEFAULT)
                .set("spark.mongodb.output.uri", Const.Mongo_URI_DEFAULT);

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
        jssc.checkpoint("./checkpoint/" + topic);

        // Initial state RDD input to mapWithState
        @SuppressWarnings("unchecked")
        Collection<String> topics = Arrays.asList(topic);
        Map<String, Object> kafkaParams = MyKafkaUtil.GetKafkaParams(broker, consumerGroup);

        final JavaInputDStream<ConsumerRecord<String, String>> messages;
        //从mongodb中获取offset信息
        Map<TopicPartition, Long> consumerDBOffsetInfo = DBOffsets.GetDBConsumerOffset(broker, consumerGroup, topic, partition);
        Map<TopicPartition, Long> consumerOffsetMap = MyKafkaUtil.GetConsumerOffset(new TopicPartition(topic, partition), consumerDBOffsetInfo);
        //如果在mongodb中没有记录，就从最小的offset开始消费
        if (consumerOffsetMap == null) {
            //Create direct kafka stream from earliest offset
            kafkaParams.put("auto.offset.reset", "earliest");
            messages =
                    KafkaUtils.createDirectStream(
                            jssc,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                    );
        } else {
            //Create direct kafka stream with brokers and topics
            messages =
                    KafkaUtils.createDirectStream(
                            jssc,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams, consumerOffsetMap)
                    );
        }

        //print RDD info
        messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {

            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                //提交offset kafka itself(存在幂等问题，通过处理结果和offset一并事务提交数据库可解决)
                //((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);//异步提交kafka itself
                //RDD转换Document
                JavaRDD<Document> documents = rdd.map(new Function<ConsumerRecord<String, String>, Document>() {
                    @Override
                    public Document call(ConsumerRecord<String, String> record) throws Exception {
                        return Document.parse(record.value());
                    }
                });
                DBCollection ubCollection = MongoUtil.instance.getDBCollection(Const.Mongo_DB, Const.SPRAK_KAFKA_COLLECTION);
                List<Document> dbObjects = new ArrayList<Document>();
                //save offset to mongodb
                for (OffsetRange Item : offsetRanges) {
                    //insert or update offset
                    DBOffsets.SaveDBOffset(Const.BROKER_HOST, consumerGroup, Item);
                    System.out.println(
                            "ConsumerInfo:consumerGroup" + consumerGroup + "topic:" + Item.topic() + ",partition: " + Item.partition() + " ,fromOffset:" + Item.fromOffset() + ",untilOffset: " + Item.untilOffset());
                }

                //存入数据
                MongoSpark.save(documents);

                rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
                    @Override
                    public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
                        OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
                        System.out.println(
                                "foreachInfo:mypartition:" + o.topic() + ",partition: " + o.partition() + " ,fromOffset:" + o.fromOffset() + ",untilOffset: " + o.untilOffset());
                    }
                });
            }
        });


        JavaPairDStream<String, UserBehaviorMsg> msgPairsStream = messages.mapToPair(
                new PairFunction<ConsumerRecord<String, String>, String, UserBehaviorMsg>() {
                    @Override
                    public Tuple2<String, UserBehaviorMsg> call(ConsumerRecord<String, String> record) {
                        //反序列化
                        UserBehaviorMsg behaviorMsg = JSON.parseObject(record.value(), UserBehaviorMsg.class);
                        String key = behaviorMsg.getBehavior();
                        return new Tuple2<>(key, behaviorMsg);
                    }
                });
        _RecordDispatcher.Dispatch(msgPairsStream);

        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }
}